Получение данных о смещении
Редактировал(а) Alexandr Fokin 2023/05/10 11:40
Задание: Получение информации о текущем состоянии параметров:
Опционально по каждому partition. |
Get Consumer lag using LibrdkafkaHandle.OutQueueLength Aug 2019 - Kafka Consumer Lag programmatically |
Решение: Данные по общему кол-ву сообщений и прочитанным сообщениям можно получить через Consumer. Причем для указанного partition. Данные по topic формируются через сумму данных всех partitions, входящих в него. |
//номер смещения последнего сообщения в очереди consumer .QueryWatermarkOffsets(TopicPartitionInfo(topicName, partitionId),ConnectionParamsEntity.ActionTimeout) .High //Получить активное смещение (последнее прочитанное сообщение) consumer .Committed( new TopicPartition[] { TopicPartitionInfo(topicName, partitionId) }, ConnectionParamsEntity.ActionTimeout ) .FirstOrDefault() .Offset .Value; |